In [1]:
import numpy as np
import torch
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import unittest
In [2]:
def runTests(test_class):
unittest.TextTestRunner().run(
unittest.TestLoader().loadTestsFromModule(
test_class()
)
)
In [3]:
VOCAB = {
"__pad__": 0,
"__bos__": 1,
"__eos__": 2,
"__unk__": 3,
"dog": 4,
"cat": 5,
"puppy": 6
}
CHAR_VOCAB = {
"__c_pad__": 0,
"__bot__": 1,
"__eot__": 2,
"__c_unk__": 3,
"__pad__": 4,
"__bos__": 5,
"__eos__": 6,
"a": 7,
"c": 8,
"d": 9,
"g": 10,
"o": 11,
"p": 12,
"t": 13,
"u": 14,
"y": 15
}
TAG_VOCAB = {
"__pad__": 0,
"__bos__": 1,
"__eos__": 2,
"animal_class": 3,
"offspring": 4
}
maxlen=10
max_tokenlen=15
def seq2idx(items, vocab, begin="__bos__", end="__eos__"):
seq = (
tuple([vocab[begin]])
+ tuple([
vocab[item]
for item in items
])
+ tuple([vocab[end]]))
#print(seq)
return seq
def padded_seq(seq, maxlen, pad_value):
seqlen = min(maxlen, len(seq))
seq = tuple(seq[:seqlen]) + tuple([pad_value]*(maxlen - seqlen))
return seq, seqlen
def get_chars_seq(sentence, char_vocab):
char_seq = tuple([["__bos__"]]) + tuple([
tuple(w) for w in sentence
]) + tuple([["__eos__"]])
char_seq = tuple([
padded_seq(
seq2idx(
chars,
char_vocab,
begin="__bot__",
end="__eot__"
),
max_tokenlen,
char_vocab["__c_pad__"]
)[0]
for chars in char_seq
])
padded_char_value = padded_seq(
seq2idx(
["__pad__"],
char_vocab,
begin="__bot__",
end="__eot__"
),
max_tokenlen,
char_vocab["__c_pad__"]
)[0]
return char_seq, padded_char_value
def transform(sentence_tags_item, vocab, char_vocab, tag_vocab):
sentence, tags = sentence_tags_item
word_tensor, word_len = padded_seq(
seq2idx(sentence, VOCAB),
maxlen,
vocab["__pad__"]
)
tag_tensor, tags_len = padded_seq(
seq2idx(tags, TAG_VOCAB),
maxlen,
tag_vocab["__pad__"]
)
assert word_len == tags_len, (
"Mismatch between padded word seq [{}]"
" and padded tag seq [{}]"
).format(word_len, tags_len)
char_seq, padded_char_value = get_chars_seq(sentence, char_vocab)
char_tensor, char_word_len = padded_seq(char_seq, maxlen, padded_char_value)
assert word_len == char_word_len, (
"Mismatch between padded word seq [{}]"
" and padded char based seq [{}]"
).format(word_len, char_word_len)
seq_len = word_len
return word_tensor, char_tensor, tag_tensor, seq_len
In [4]:
char_seq, padded_char_value = get_chars_seq(["dog", "cat", "dog", "puppy"], CHAR_VOCAB)
char_tensor, char_word_len = padded_seq(char_seq, maxlen, padded_char_value)
word_tensor, char_tensor, tag_tensor, seq_len = transform((
["dog", "cat", "dog", "puppy"],
["animal_class", "animal_class", "animal_class", "offspring"]
), VOCAB, CHAR_VOCAB, TAG_VOCAB)
np.array(word_tensor).shape, np.array(char_tensor).shape, np.array(tag_tensor).shape, seq_len
Out[4]:
In [5]:
class TestTransforms(unittest.TestCase):
def test_seq2idx(self):
self.assertEqual(
seq2idx(["dog", "cat", "dog", "puppy"], VOCAB),
(1, 4, 5, 4, 6, 2)
)
def test_padded_seq(self):
self.assertEqual(
padded_seq(
seq2idx(
["dog", "cat", "dog", "puppy"],
VOCAB
),
maxlen,
VOCAB["__pad__"]
),
((1, 4, 5, 4, 6, 2, 0, 0, 0, 0), 6)
)
def test_padded_char_seq(self):
char_seq, padded_char_value = get_chars_seq(["dog", "cat", "dog", "puppy"], CHAR_VOCAB)
char_tensor, char_word_len = padded_seq(char_seq, maxlen, padded_char_value)
self.assertEqual(
np.array(char_tensor).shape,
(maxlen, max_tokenlen)
)
def test_transform(self):
word_tensor, char_tensor, tag_tensor, seq_len = transform(
(
["dog", "cat", "dog", "puppy"],
["animal_class", "animal_class", "animal_class", "offspring"]
),
VOCAB,
CHAR_VOCAB,
TAG_VOCAB
)
self.assertEqual(
(
np.array(word_tensor).shape,
np.array(char_tensor).shape,
np.array(tag_tensor).shape,
seq_len
), ((10,), (10, 15), (10,), 6)
)
In [6]:
runTests(TestTransforms)
In [7]:
class SentenceDataset(Dataset):
def __init__(
self,
sentence_tags_items,
transform,
vocab,
char_vocab,
tag_vocab
):
self.sentence_tags_items = sentence_tags_items
self.transform = transform
self.vocab = vocab
self.char_vocab = char_vocab
self.tag_vocab = tag_vocab
def __getitem__(self, idx):
word_tensor, char_tensor, tag_tensor, seq_len = self.transform(
self.sentence_tags_items[idx],
self.vocab,
self.char_vocab,
self.tag_vocab
)
word_tensor = torch.from_numpy(np.asarray(word_tensor))#.view(-1, 1)
char_tensor = torch.from_numpy(np.asarray(char_tensor))
tag_tensor = torch.from_numpy(np.asarray(tag_tensor))#.view(-1, 1)
seq_len = torch.from_numpy(np.asarray([seq_len]))
return word_tensor, char_tensor, tag_tensor, seq_len
def __len__(self):
return len(self.sentence_tags_items)
In [8]:
sentence_tag_items = [
(
["dog", "cat", "dog", "puppy"],
["animal_class", "animal_class", "animal_class", "offspring"]
),
(
["dog", "cat", "cat", "puppy"],
["animal_class", "animal_class", "animal_class", "offspring"]
),
(
["dog", "puppy", "dog", "puppy"],
["animal_class", "offspring", "animal_class", "offspring"]
),
]
In [9]:
sent_dataset = SentenceDataset(
sentence_tag_items,
transform,
VOCAB,
CHAR_VOCAB,
TAG_VOCAB
)
train_loader = DataLoader(sent_dataset, batch_size=10, shuffle=True, num_workers=1)
In [10]:
word_tensors, char_tensors, tag_tensors, seq_len = next(iter(train_loader))
word_tensors.size(), char_tensors.size(), tag_tensors.size(), seq_len.size()
Out[10]:
In [11]:
seq_len.size()
Out[11]:
In [12]:
conv1d = torch.nn.Conv1d(5, 10, 1, dilation=2)
In [13]:
torch.rand(2,5,4).size()
Out[13]:
In [14]:
conv1d(Variable(torch.rand(2,5,4), requires_grad=False)).size()
Out[14]:
In [15]:
emb = torch.nn.Embedding(10, 5)
In [16]:
embeddings = emb(Variable(torch.LongTensor([[1,2,4,5],[4,3,2,9]]), requires_grad=False))
embeddings.size()
Out[16]:
In [17]:
embeddings.permute(0, 2, 1).size()
Out[17]:
In [18]:
conv1d(embeddings.permute(0, 2, 1))
Out[18]:
In [19]:
conv1d(embeddings.permute(0, 2, 1)).max(2)[1].size()
Out[19]:
In [20]:
embeddings.unsqueeze(1).size()
Out[20]:
In [21]:
char_tensors.size()
Out[21]:
In [22]:
char_tensors.view(-1, 15).view(3, 10, -1).shape
Out[22]:
In [23]:
class CharCNN(torch.nn.Module):
def __init__(self):
super(CharCNN, self).__init__()
self.char_embedding=4
self.char_conv_features=5
self.char_conv_kernel=1
self.char_emb = torch.nn.Embedding(
len(CHAR_VOCAB),
self.char_embedding
)
self.char_conv1d = torch.nn.Conv1d(
self.char_embedding,
self.char_conv_features,
self.char_conv_kernel
)
self.output_size = self.char_conv_features
def forward(self, char_tensors):
batch_size, seqlen, char_seqlen = char_tensors.size()
char_tensors = char_tensors.view(-1, char_seqlen)
char_tensors = self.char_emb(char_tensors)
char_tensors = char_tensors.permute(0, 2, 1)
char_tensors = self.char_conv1d(char_tensors)
char_tensors = char_tensors.max(2)[0] # Get the global max
char_tensors = char_tensors.view(batch_size, seqlen, -1)
return char_tensors
In [24]:
char_tensors.shape
Out[24]:
In [25]:
char_model = CharCNN()
In [26]:
char_tensors.size()
Out[26]:
In [27]:
char_model(Variable(char_tensors, requires_grad=False)).size()
Out[27]:
In [28]:
torch.cat((char_tensors, char_tensors), -1).size()
Out[28]:
In [29]:
embeddings.max(0)
Out[29]:
In [30]:
class WordEmbeddings(torch.nn.Module):
def __init__(
self,
char_model,
):
super(WordEmbeddings, self).__init__()
self.char_model = char_model
self.word_embedding = 10
self.word_emb = torch.nn.Embedding(
len(VOCAB),
self.word_embedding
)
self.output_size = (
self.word_embedding
+ self.char_model.output_size
)
def forward(self, word_tensors, char_tensors):
char_based_embs = self.char_model(char_tensors)
#print(char_based_embs.size(), type(char_based_embs.data))
word_embs = self.word_emb(word_tensors)
#print(word_embs.size(), type(word_embs.data))
word_embs = torch.cat(
[word_embs, char_based_embs],
-1
) # Concat word and char based embeddings
return word_embs
In [31]:
word_model = WordEmbeddings(char_model)
In [32]:
word_tensors.size(), char_tensors.size()
Out[32]:
In [33]:
word_model(
Variable(word_tensors, requires_grad=False),
Variable(char_tensors, requires_grad=False)
).size()
Out[33]:
In [34]:
class ID_CNN(torch.nn.Module):
"""ID CNN Encoder
Input: (batch, input_dims, seqlen)
Outpus: (batch, input_dims, seqlen)
"""
def __init__(
self,
input_dims,
dialation_block_depth=5,
field_of_view=2,
block_stacks=2
):
super(ID_CNN, self).__init__()
# We want to make the input emb same as output emb
# This allows us to recursively stack the layers.
self.conv_features = input_dims
self.conv_kernel = 3
self.block_stacks = block_stacks
self.word_char_conv1d = torch.nn.Sequential(
*[
torch.nn.Sequential(
torch.nn.Conv1d(
input_dims,
self.conv_features,
kernel_size=self.conv_kernel,
padding=field_of_view**i,
dilation=field_of_view**i
),
torch.nn.ReLU()
)
for i in range(dialation_block_depth)
]
)
def forward(self, seq_scores):
for block_idx in range(self.block_stacks):
seq_scores = self.word_char_conv1d(seq_scores)
return seq_scores
class IDCNNEncoder(torch.nn.Module):
"""IDCNNEncoder - Encodes word and char based sentence
Input:
word_tensors - (batch, seqlen),
char_tensors - (batch, seqlen, char_seqlen)
"""
def __init__(
self,
word_model,
):
super(IDCNNEncoder, self).__init__()
self.word_model = word_model
self.id_cnn = ID_CNN(self.word_model.output_size)
def forward(self, word_tensors, char_tensors):
word_embs = self.word_model(word_tensors, char_tensors)
word_embs = word_embs.permute(0, 2, 1)
seq_scores = self.id_cnn(word_embs)
return seq_scores
class IDCNNDecoder(torch.nn.Module):
def __init__(
self,
input_dims,
num_classes,
decoder_layers=3
):
super(IDCNNDecoder, self).__init__()
self.input_dims = input_dims
self.num_classes = num_classes
self.decoder_layers = decoder_layers
self.transform_layer = torch.nn.Sequential(
torch.nn.Linear(self.input_dims, self.num_classes),
torch.nn.ReLU()
)
self.create_decoder_layers()
def create_decoder_layers(self):
self.id_cnn = torch.nn.ModuleList(
[
ID_CNN(self.num_classes, self.num_classes, block_stacks=1)
for i in range(self.decoder_layers)
]
)
def forward(self, seq_scores):
outputs = []
batch, input_dims, seqlen = seq_scores.size()
seq_scores = seq_scores.permute(0, 2, 1).contiguous()
seq_scores = seq_scores.view(batch*seqlen, input_dims)
seq_scores = self.transform_layer(seq_scores)
seq_scores = seq_scores.view(batch, seqlen, self.num_classes)
seq_scores = seq_scores.permute(0, 2, 1)
for id_cnn in self.id_cnn:
seq_scores = id_cnn(seq_scores)
outputs.append(seq_scores)
return outputs
In [35]:
id_cnn = IDCNNEncoder(word_model)
word_tensors.size(), char_tensors.size()
Out[35]:
In [36]:
id_cnn(
Variable(word_tensors, requires_grad=False),
Variable(char_tensors, requires_grad=False)
).size()
Out[36]:
In [37]:
id_cnn_decoder = IDCNNDecoder(15, len(TAG_VOCAB))
In [38]:
decoder_outputs = id_cnn_decoder(id_cnn(
Variable(word_tensors, requires_grad=False),
Variable(char_tensors, requires_grad=False)
))
[output.size() for output in decoder_outputs]
Out[38]:
In [39]:
def get_loss(decoder_outputs, target, loss_fn):
batch, seqlen = target.size()[:2]
#target = target.unsqueeze(2).permute(0,2,1).contiguous().view(-1, 1).squeeze()
target = target.view(-1)
#print(target.size())
loss = None
for output in decoder_outputs:
output = output.permute(0,2,1).contiguous().view(-1, output.size()[1])
#print(output.size())
if loss is None:
loss = loss_fn(output, target)
else:
loss += loss_fn(output, target)
return loss
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=0)
In [40]:
decoder_outputs[0].permute(0,2,1).contiguous().view(-1, decoder_outputs[0].size()[1]).size()
Out[40]:
In [41]:
get_loss(decoder_outputs, Variable(tag_tensors, requires_grad=False), loss_fn)
Out[41]:
In [42]:
def train(encoder, decoder, dataloader, num_epochs, history=None):
if history is None:
history = []
cuda = torch.cuda.is_available()
if cuda:
encoder.cuda()
decoder.cuda()
optimizer = torch.optim.Adam(list(encoder.parameters()) + list(decoder.parameters()))
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=0)
for i in range(num_epochs):
per_epoch_losses = []
for batch in dataloader:
word_tensors = Variable(batch[0], requires_grad=False)
char_tensors = Variable(batch[1], requires_grad=False)
tag_tensors = Variable(batch[2], requires_grad=False)
seq_len = Variable(batch[3], requires_grad=False)
if cuda:
word_tensors = word_tensors.cuda()
char_tensors = char_tensors.cuda()
tag_tensors = tag_tensors.cuda()
optimizer.zero_grad()
encoding = encoder(word_tensors, char_tensors)
outputs = decoder(encoding)
loss = get_loss(outputs, tag_tensors, loss_fn)
loss.backward()
optimizer.step()
per_epoch_losses.append(loss.data[0])
history.append(np.mean(per_epoch_losses))
print('epoch[%d] loss: %.4f' % (i, loss.data[0]))
return history
In [43]:
char_model = CharCNN()
word_model = WordEmbeddings(char_model)
id_cnn = IDCNNEncoder(word_model)
id_cnn_decoder = IDCNNDecoder(15, len(TAG_VOCAB))
history = None
In [44]:
history = train(id_cnn, id_cnn_decoder, train_loader, 10, history=history)
In [ ]:
In [ ]: